{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Getting Started with RxPY\n",
    "\n",
    "[ReactiveX](http://reactivex.io), or Rx for short, is an API for programming with observable event streams. RxPY is a port of ReactiveX to Python. Learning Rx with Python is particularly interesting since Python removes much of the clutter that comes with statically typed languages. RxPY works with both Python 2 and Python 3 but all examples in this tutorial uses [Python 3.4](http://www.python.org)."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Rx is about processing streams of events. With Rx you:\n",
    "\n",
    "*  Tell what you want to process (Observable)\n",
    "*  How you want to process it (A composition of operators)\n",
    "*  What you want to do with the result (Observer)\n",
    "\n",
    "It's important to understand that with Rx you describe what you want to do with events if and when they arrive. It's all a declarative composition of operators that will do some processing the events when they arrive. If nothing happens, then nothing is processed.\n",
    "\n",
    "Thus the pattern is that you `subscribe` to an `Observable` using an `Observer`:\n",
    "\n",
    "```python\n",
    "subscription = Observable.subscribe(observer)\n",
    "```\n",
    "\n",
    "***NOTE:*** Observables are not active in themselves. They need to be subscribed to make something happen. Simply having an Observable lying around doesn't make anything happen."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Install\n",
    "\n",
    "Use `pip` to install RxPY:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 41,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Requirement already satisfied (use --upgrade to upgrade): rx in /Users/dbrattli/GitHub/RxPY\n"
     ]
    }
   ],
   "source": [
    "%%bash\n",
    "pip install rx"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Importing the Rx module"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "import rx\n",
    "from rx import Observable, Observer"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Generating a sequence\n",
    "\n",
    "There are many ways to generate a sequence of events. The easiest way to get started is to use the `from_iterable()` operator that is also called just `from_`. Other operators you may use to generate a sequence such as `just`, `generate`, `create` and `range`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Got: 0\n",
      "Got: 1\n",
      "Got: 2\n",
      "Got: 3\n",
      "Got: 4\n",
      "Got: 5\n",
      "Got: 6\n",
      "Got: 7\n",
      "Got: 8\n",
      "Got: 9\n",
      "Sequence completed\n"
     ]
    }
   ],
   "source": [
    "class MyObserver(Observer):\n",
    "    def on_next(self, x):\n",
    "        print(\"Got: %s\" % x)\n",
    "        \n",
    "    def on_error(self, e):\n",
    "        print(\"Got error: %s\" % e)\n",
    "        \n",
    "    def on_completed(self):\n",
    "        print(\"Sequence completed\")\n",
    "\n",
    "xs = Observable.from_iterable(range(10))\n",
    "d = xs.subscribe(MyObserver())"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "0\n",
      "1\n",
      "2\n",
      "3\n",
      "4\n",
      "5\n",
      "6\n",
      "7\n",
      "8\n",
      "9\n"
     ]
    }
   ],
   "source": [
    "xs = Observable.from_(range(10))\n",
    "d = xs.subscribe(print)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**NOTE:** The subscribe method takes an observer, or one to three callbacks for handing `on_next()`, `on_error()`, and `on_completed()`. This is why we can use `print` directly as the observer in the example above, since it becomes the `on_next()` handler for an anonymous observer. "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Filtering a sequence"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {
    "collapsed": false,
    "scrolled": true
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "1\n",
      "3\n",
      "5\n",
      "7\n",
      "9\n"
     ]
    }
   ],
   "source": [
    "xs = Observable.from_(range(10))\n",
    "d = xs.filter(\n",
    "        lambda x: x % 2\n",
    "    ).subscribe(print)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Transforming a sequence"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "0\n",
      "2\n",
      "4\n",
      "6\n",
      "8\n",
      "10\n",
      "12\n",
      "14\n",
      "16\n",
      "18\n"
     ]
    }
   ],
   "source": [
    "xs = Observable.from_(range(10))\n",
    "d = xs.map(\n",
    "        lambda x: x * 2\n",
    "    ).subscribe(print)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**NOTE: ** You can also take an index as the second parameter to the mapper function:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "0: 20\n",
      "1: 24\n",
      "2: 28\n",
      "3: 32\n",
      "4: 36\n"
     ]
    }
   ],
   "source": [
    "xs = Observable.from_(range(10, 20, 2))\n",
    "d = xs.map(\n",
    "        lambda x, i: \"%s: %s\" % (i, x * 2)\n",
    "    ).subscribe(print)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Merge\n",
    "\n",
    "Merging two observable sequences into a single observable sequence using the `merge` operator:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "a\n",
      "1\n",
      "b\n",
      "2\n",
      "c\n",
      "3\n",
      "d\n",
      "4\n",
      "e\n",
      "5\n"
     ]
    }
   ],
   "source": [
    "xs = Observable.range(1, 5)\n",
    "ys = Observable.from_(\"abcde\")\n",
    "zs = xs.merge(ys).subscribe(print)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## The Spacetime of Rx\n",
    "\n",
    "In the examples above all the events happen at the same moment in time. The events are only separated by ordering. This confuses many newcomers to Rx since the result of the `merge` operation above may have several valid results such as:\n",
    "\n",
    "    a1b2c3d4e5\n",
    "    1a2b3c4d5e\n",
    "    ab12cd34e5\n",
    "    abcde12345\n",
    "    \n",
    "The only guarantee you have is that 1 will be before 2 in `xs`, but 1 in `xs` can be before or after `a` in `ys`. It's up the the sort stability of the scheduler to decide which event should go first. For real time data streams this will not be a problem since the events will be separated by actual time. To make sure you get the results you \"expect\", it's always a good idea to add some time between the events when playing with Rx."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Marbles and Marble Diagrams\n",
    "\n",
    "As we saw in the previous section it's nice to add some time when playing with Rx and RxPY. A great way to explore RxPY is to use the `marbles` test module that enables us to play with [marble diagrams](http://rxmarbles.com). The marbles module adds two new extension methods to `Observable`. The methods are `from_marbles()` and `to_marbles()`.\n",
    "\n",
    "Examples:\n",
    "1.  `res = rx.Observable.from_marbles(\"1-2-3-|\")`\n",
    "2.  `res = rx.Observable.from_marbles(\"1-2-3-x\", rx.Scheduler.timeout)`\n",
    "\n",
    "The marble string consists of some special characters:\n",
    "\n",
    "```\n",
    "    - = Timespan of 100 ms\n",
    "    x = on_error()\n",
    "    | = on_completed()\n",
    "```\n",
    "\n",
    "All other characters are treated as an `on_next()` event at the given moment they are found on the string. If you need to represent multi character values, then you can group then with brackets such as \"1-(42)-3\". \n",
    "\n",
    "Lets try it out:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "'a-b-c-|'"
      ]
     },
     "execution_count": 8,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "from rx.testing import marbles\n",
    "\n",
    "xs = Observable.from_marbles(\"a-b-c-|\")\n",
    "xs.to_blocking().to_marbles()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "collapsed": false
   },
   "source": [
    "It's now easy to also add errors into the even stream by inserting `x` into the marble string:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "'11-22-33-4x'"
      ]
     },
     "execution_count": 9,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "xs = Observable.from_marbles(\"1-2-3-x-5\")\n",
    "ys = Observable.from_marbles(\"1-2-3-4-5\")\n",
    "xs.merge(ys).to_blocking().to_marbles()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Subjects and Streams\n",
    "\n",
    "A simple way to create an observable stream is to use a subject. It's probably called a subject after the Subject-Observer pattern described in the [Design Patterns](http://www.amazon.com/Design-Patterns-Elements-Reusable-Object-Oriented/dp/0201633612/ref=sr_1_1?s=books&ie=UTF8&qid=1431184351&sr=1-1&keywords=design+patterns) book by the gang of four (GOF).\n",
    "\n",
    "Anyway, a Subject is both an `Observable` and an `Observer`, so you can both subscribe to it and `on_next` it with events. This makes it an obvious candidate if need to publish values into an observable stream for processing:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Got: 42\n"
     ]
    }
   ],
   "source": [
    "from rx.subjects import Subject\n",
    "\n",
    "stream = Subject()\n",
    "stream.on_next(41)\n",
    "\n",
    "d = stream.subscribe(lambda x: print(\"Got: %s\" % x))\n",
    "\n",
    "stream.on_next(42)\n",
    "\n",
    "d.dispose()\n",
    "stream.on_next(43)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "*That's all for now*"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.4.3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 0
}
